Dall-e 3 prueba de usuario
Explorando IA, Datos y Tecnología
Uniones de difusión vs. Uniones sesgadas
Son tres puntos de entrada para interactuar con el motor de procesamiento:
Para configurar Spark Context, es necesario configurar directamente el Spark Master o usar SparkConf para una configuración detallada.
La sesión de Spark utiliza un patrón de constructor.
SparkContext gestiona los RDD directamente, mientras que SparkSession tenía un SparkContext integrado que gestiona estas interacciones.
SparkContext no admite DataFrames, solo SparkSession sí.
En Spark, las transformaciones estrechas son aquellas que no requieren reorganización. Esto aumenta el rendimiento, ya que las operaciones se pueden gestionar individualmente en sus propias particiones. Algunas de estas operaciones son: filtro, mapa y unión. Las uniones solo pueden ser una función estrecha si el conjunto de datos está particionado por la clave de unión.
Por el contrario, las operaciones que requieren reorganización entre trabajadores se denominan particiones amplias. Implican una penalización de rendimiento en tiempo de ejecución, pero a veces son inevitables. Las uniones, agrupar por clave y reducir por clave son operaciones que requieren reorganización de datos.
La evaluación diferida es una función de Spark que impide que las transformaciones se ejecuten hasta que se ejecute una acción. Gracias a Catalyst, busca la forma más eficiente de combinar las transformaciones.
Depende del tamaño del conjunto de datos. PySpark suele ser más rápido que Python porque puede distribuir la carga de trabajo en diferentes nodos y procesar los datos en paralelo. Sin embargo, existen...
Dado el siguiente archivo CSV:
desde pyspark.sql importar SparkSession
desdepyspark.sql.types importa StringType, DoubleType, StructType, IntegerType
from pyspark.sql.functions importa col, avg
spark = SparkSession.builder.getOrCreate()
schema = StructType() \
.add("id_usuario", IntegerType(), True) \
.add("id_producto", IntegerType(), True) \
.add("cantidad", DoubleType(), True) \
.add("ciudad", StringType(), True)
df_products = spark.read.options(header=True,delimiter=";",schema=schema).csv('r uta/al/archivo')
total_por_usuario = df_products.groupBy('usuario').sum('cantidad')
filtered_avg_city = df_products.groupBy('city').agg(avg('amount').alias(avg_amou nt)).filter(col('avg_amount' > 8000)
filtered_avg_city.cache()
user_id;name
1;'Wilmer'
2;'Jayson'
user_id;product_id;amount
1;101;12500.0
1;102;8900.0
2;103;3000.0
2;101;5000.0
de pyspark.sql import SparkSession
de pyspark.sql.functions import col, sum,sort
spark = SparkSession.builder.getOrCreate()
df_user = spark.read.options(header=True,delimiter=";",inferSchema=True).csv("ru ta/al/archivo")
df_transactions = spark.read.options(header=True,delimiter=";",inferSchema=True) .csv("ruta/al/archivo")
df_user_transactions = df_user.join(df_transactions,df_user.user_id == df_transactions.user_id,"inner")
df_total_por_usuario = df_user_transactions.groupBy(col("nombre")).agg(suma("can tidad")).sort(col("cantidad").desc
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sales_df = spark.createDataFrame([
("2024-01-01", "A", 10),
("2024-01-02", "A", 15),
("2024-01-03", "A", 7),
("2024-01-01", "B", 20),
("2024-01-02", "B", 5)
], ["fecha", "tienda", "ingresos"])
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as _sum,
windowSpec = Window.partitionBy("store").orderBy("date")
spark = SparkSession.builder.getOrCreate()
window_revenue = sales_df.withColumn("window_sum" ,̣_sum("revenue").over(windowSpec)) \
.withColumn("day_to_day_diff" ,col("revenue") - lag("revenue").over(windowSpec))
window_revenue .show()
spark.stop()
rdd = sc.parallelize([1, 2, 3, 4])
squared_rdd = rdd.map(lambda x: x**2)
squared_rdd.collect()
# salida
[1, 4, 9, 16]
rdd = sc.parallelize([1, 2, 3, 4], 2)
def sum_partition(iterator):
yield sum(iterator)
sum_rdd = rdd.mapPartitions(sum_partition)
sum_rdd.collect()
# salida
[3, 7]
rdd = sc.parallelize(["hola mundo", "¿cómo estás?") tú"])
# Define una función para dividir cada línea en palabras
def split_line(line):
return line.split(" ")
flat_rdd = rdd.flatMap(split_line)
flat_rdd.collect()
# output
['hello', 'world', 'how', 'are', 'you']
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import lit, rand, floor,concat
spark = SparkSession.builder.getOrCreate()
data = [
('A', 100),
('A', 200),
('A', 300),
('B', 10),
('C', 20)
]
df = spark.createDataFrame(data, ['key','value'])
# Número de sales
num_salts = 3
# Añadir sal aleatoria a la clave
df_salted = df.withColumn(
"salt", floor(rand() * num_salts)
).withColumn(
"salted_key", concat(col("key"), lit("_"), col("salt"))
)
df_salted.show()